package com.xiaofan

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, StateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import java.time.Duration
import java.util
import scala.collection.mutable.ListBuffer

/**
 * 输入的登录时间样例类
 */
case class LoginEvent(userId: Long, ip: String, eventType: String, timestamp: Long)

/**
 * 输出警告样例类
 */
case class LoginFailWarning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)

object LoginFail {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")

    val dataStream: DataStream[LoginEvent] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          LoginEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
        }
      }
      .assignTimestampsAndWatermarks {
        WatermarkStrategy
          .forBoundedOutOfOrderness[LoginEvent](Duration.ofSeconds(3))
          .withTimestampAssigner(new SerializableTimestampAssigner[LoginEvent] {
            override def extractTimestamp(element: LoginEvent, recordTimestamp: Long): Long = element.timestamp * 1000L
          })
      }

    // 进行判断和检测，如果2秒之内连续能录失败，输出报警信息
    val loginFailWarningStream: DataStream[LoginFailWarning] = dataStream
      .keyBy(_.userId)
      .process(new LoginFailWarningResult(2))

    loginFailWarningStream.print()
    env.execute("login fail detect job")
  }
}


class LoginFailWarningResult(failTimes: Int) extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning] {

  // 定义状态， 保存当前所有的登录失败时间，保存定时器的时间戳
  lazy val loginFailListState: ListState[LoginEvent] = getRuntimeContext.getListState(new ListStateDescriptor[LoginEvent]("loginFail-list", classOf[LoginEvent]))
  lazy val timeTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timer-ts", classOf[Long]))

  override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {
    // 判断当前登录时间成功还是失败
    if (value.eventType == "fail") {
      loginFailListState.add(value)

      // 如果没有定时器，那么注册一个2秒之后的定时器
      if (timeTsState.value() == 0) {
        val ts: Long = value.timestamp * 1000L + 2000L
        ctx.timerService().registerEventTimeTimer(ts)
        timeTsState.update(ts)
      }
    } else {
      // 如果是成功，那么直接清空状态和定时器，重新开始
      ctx.timerService().deleteEventTimeTimer(timeTsState.value())
      timeTsState.clear()
      loginFailListState.clear()
    }
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#OnTimerContext, out: Collector[LoginFailWarning]): Unit = {

    val allLoginFailList: ListBuffer[LoginEvent] = ListBuffer[LoginEvent]()

    val iter: util.Iterator[LoginEvent] = loginFailListState.get.iterator()
    while (iter.hasNext) {
      allLoginFailList += iter.next()
    }

    // 判断登录失败时间的个数，如果超过了上限，报警
    if (allLoginFailList.length >= failTimes) {
      out.collect(LoginFailWarning(
        allLoginFailList.head.userId,
        allLoginFailList.head.timestamp,
        allLoginFailList.last.timestamp,
        "login fail in 2s for " + allLoginFailList.length + " times."
      ))
    }

    loginFailListState.clear()
    timeTsState.clear()
  }
}




























